# This file is part of cloud-init. See LICENSE file for license information.

import os
from collections import namedtuple

import pytest

import cloudinit.settings
from cloudinit.cmd import clean
from cloudinit.distros import Distro
from cloudinit.stages import Init
from cloudinit.util import ensure_dir, sym_link, write_file
from tests.unittests.helpers import mock, wrap_and_call

MyPaths = namedtuple("MyPaths", "cloud_dir")
CleanPaths = namedtuple(
    "CleanPaths",
    ["tmpdir", "cloud_dir", "clean_dir", "log", "output_log"],
)


@pytest.fixture(scope="function")
def clean_paths(tmpdir):
    return CleanPaths(
        tmpdir=tmpdir,
        cloud_dir=tmpdir.join("varlibcloud"),
        clean_dir=tmpdir.join("clean.d"),
        log=tmpdir.join("cloud-init.log"),
        output_log=tmpdir.join("cloud-init-output.log"),
    )


@pytest.fixture(scope="function")
def init_class(clean_paths):
    init = mock.Mock(spec=Init)
    init.paths = MyPaths(cloud_dir=f"{clean_paths.cloud_dir}/")
    init.distro.shutdown_command = Distro.shutdown_command
    init.cfg = {
        "def_log_file": clean_paths.log,
        "output": {"all": f"|tee -a {clean_paths.output_log}"},
    }
    return init


class TestClean:
    def test_remove_artifacts_removes_logs(self, clean_paths, init_class):
        """remove_artifacts removes logs when remove_logs is True."""
        clean_paths.log.write("cloud-init-log")
        clean_paths.output_log.write("cloud-init-output-log")

        assert (
            os.path.exists(clean_paths.cloud_dir) is False
        ), "Unexpected cloud_dir"
        retcode = clean.remove_artifacts(
            init=init_class,
            remove_logs=True,
        )
        assert (
            clean_paths.log.exists() is False
        ), f"Unexpected file {clean_paths.log}"
        assert (
            clean_paths.output_log.exists() is False
        ), f"Unexpected file {clean_paths.output_log}"
        assert 0 == retcode

    def test_remove_net_conf(self, clean_paths, init_class):
        """remove_config removes config files when True."""

        TEST_GEN_NET_CONFIG_FILES = [
            clean_paths.tmpdir.join(conf_file)
            for conf_file in clean.GEN_NET_CONFIG_FILES
        ]
        for conf_path in TEST_GEN_NET_CONFIG_FILES:
            assert conf_path.exists() is False, f"Unexpected {conf_path}"
            ensure_dir(os.path.dirname(conf_path))
            if "*" in conf_path.strpath:
                # Expand glob path to interface-specific eth0 filename
                eth0_path = conf_path.strpath.replace("*", "eth0")
                write_file(
                    eth0_path, f"#generated by cloud-init test {eth0_path}"
                )
            else:
                conf_path.write(f"#generated by cloud-init test {conf_path}")

        with mock.patch(
            "cloudinit.cmd.clean.GEN_NET_CONFIG_FILES",
            [f.strpath for f in TEST_GEN_NET_CONFIG_FILES],
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
                remove_config=["network"],
            )

        for conf_path in TEST_GEN_NET_CONFIG_FILES:
            if "*" in conf_path.strpath:
                # Expand glob path to interface-specific eth0 filename
                assert (
                    os.path.exists(conf_path.strpath.replace("*", "eth0"))
                    is False
                ), f"Unexpected file {conf_path.strpath.replace('*', 'eth0')}"
            else:
                assert (
                    conf_path.exists() is False
                ), f"Unexpected file {conf_path}"
        assert 0 == retcode

    def test_remove_ssh_conf(self, clean_paths, init_class):
        """remove_config removes config files when True."""

        TEST_GEN_SSH_CONFIG_FILES = [
            clean_paths.tmpdir.join(conf_file)
            for conf_file in clean.GEN_SSH_CONFIG_FILES
        ]
        for conf_path in TEST_GEN_SSH_CONFIG_FILES:
            assert conf_path.exists() is False, f"Unexpected {conf_path}"
            ensure_dir(os.path.dirname(conf_path))
            conf_path.write(f"#generated by cloud-init\ntouch {conf_path}\n")

        with mock.patch(
            "cloudinit.cmd.clean.GEN_SSH_CONFIG_FILES",
            TEST_GEN_SSH_CONFIG_FILES,
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
                remove_config=["ssh_config"],
            )

        for conf_path in TEST_GEN_SSH_CONFIG_FILES:
            assert conf_path.exists() is False, f"Unexpected file {conf_path}"
        assert 0 == retcode

    def test_remove_all_conf(self, clean_paths, init_class):
        """remove_config removes config files when True."""

        TEST_GEN_NET_CONFIG_FILES = [
            clean_paths.tmpdir.join(conf_file)
            for conf_file in clean.GEN_NET_CONFIG_FILES
        ]
        for conf_path in TEST_GEN_NET_CONFIG_FILES:
            assert conf_path.exists() is False, f"Unexpected {conf_path}"
            ensure_dir(os.path.dirname(conf_path))
            conf_path.write(f"#generated by cloud-init\ntouch {conf_path}\n")

        TEST_GEN_SSH_CONFIG_FILES = [
            clean_paths.tmpdir.join(conf_file)
            for conf_file in clean.GEN_SSH_CONFIG_FILES
        ]
        for conf_path in TEST_GEN_SSH_CONFIG_FILES:
            assert conf_path.exists() is False, f"Unexpected {conf_path}"
            ensure_dir(os.path.dirname(conf_path))
            conf_path.write(f"#generated by cloud-init\ntouch {conf_path}\n")

        with mock.patch(
            "cloudinit.cmd.clean.GEN_NET_CONFIG_FILES",
            [f.strpath for f in TEST_GEN_NET_CONFIG_FILES],
        ), mock.patch(
            "cloudinit.cmd.clean.GEN_SSH_CONFIG_FILES",
            TEST_GEN_SSH_CONFIG_FILES,
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
                remove_config=["all"],
            )

        for conf_path in TEST_GEN_NET_CONFIG_FILES:
            assert conf_path.exists() is False, f"file {conf_path} exists!"
        for conf_path in TEST_GEN_SSH_CONFIG_FILES:
            assert conf_path.exists() is False, f"file {conf_path} exists!"
        assert 0 == retcode

    def test_keep_net_conf(self, clean_paths, init_class):
        """remove_config removes config files when True."""

        TEST_GEN_NET_CONFIG_FILES = [
            clean_paths.tmpdir.join(conf_file)
            for conf_file in clean.GEN_NET_CONFIG_FILES
        ]
        for conf_path in TEST_GEN_NET_CONFIG_FILES:
            assert conf_path.exists() is False, f"Unexpected {conf_path}"
            ensure_dir(os.path.dirname(conf_path))
            conf_path.write(f"#generated by cloud-init\ntouch {conf_path}\n")

        with mock.patch(
            "cloudinit.cmd.clean.GEN_NET_CONFIG_FILES",
            TEST_GEN_NET_CONFIG_FILES,
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
                remove_config=[],
            )

        for conf_path in TEST_GEN_NET_CONFIG_FILES:
            assert conf_path.exists() is True, f"file {conf_path} removed!"
        assert 0 == retcode

    @pytest.mark.allow_all_subp
    def test_remove_artifacts_runparts_clean_d(self, clean_paths, init_class):
        """remove_artifacts performs runparts on CLEAN_RUNPARTS_DIR"""
        ensure_dir(clean_paths.cloud_dir)
        artifact_file = clean_paths.tmpdir.join("didit")
        ensure_dir(clean_paths.clean_dir)
        assert artifact_file.exists() is False, f"Unexpected {artifact_file}"
        clean_script = clean_paths.clean_dir.join("1.sh")
        clean_script.write(f"#!/bin/sh\ntouch {artifact_file}\n")
        clean_script.chmod(mode=0o755)
        with mock.patch.object(
            cloudinit.settings, "CLEAN_RUNPARTS_DIR", clean_paths.clean_dir
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
            )
        assert (
            artifact_file.exists() is True
        ), f"Missing expected {artifact_file}"
        assert 0 == retcode

    def test_remove_artifacts_preserves_logs(self, clean_paths, init_class):
        """remove_artifacts leaves logs when remove_logs is False."""
        clean_paths.log.write("cloud-init-log")
        clean_paths.output_log.write("cloud-init-output-log")

        retcode = clean.remove_artifacts(
            init_class,
            remove_logs=False,
        )
        assert 0 == retcode
        assert (
            clean_paths.log.exists() is True
        ), f"Missing expected file {clean_paths.log}"
        assert (
            clean_paths.output_log.exists()
        ), f"Missing expected file {clean_paths.output_log}"

    def test_remove_artifacts_removes_unlinks_symlinks(
        self, clean_paths, init_class
    ):
        """remove_artifacts cleans artifacts dir unlinking any symlinks."""
        dir1 = clean_paths.cloud_dir.join("dir1")
        ensure_dir(dir1)
        symlink = clean_paths.cloud_dir.join("mylink")
        sym_link(dir1.strpath, symlink.strpath)

        with mock.patch.object(
            cloudinit.settings, "CLEAN_RUNPARTS_DIR", clean_paths.clean_dir
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
            )
        assert 0 == retcode
        for path in (dir1, symlink):
            assert path.exists() is False, f"Unexpected {path} found"

    def test_remove_artifacts_removes_artifacts_skipping_seed(
        self, clean_paths, init_class
    ):
        """remove_artifacts cleans artifacts dir with exception of seed dir."""
        dirs = [
            clean_paths.cloud_dir,
            clean_paths.cloud_dir.join("seed"),
            clean_paths.cloud_dir.join("dir1"),
            clean_paths.cloud_dir.join("dir2"),
        ]
        for _dir in dirs:
            ensure_dir(_dir)

        with mock.patch.object(
            cloudinit.settings, "CLEAN_RUNPARTS_DIR", clean_paths.clean_dir
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
            )
        assert 0 == retcode
        for expected_dir in dirs[:2]:
            assert expected_dir.exists() is True, f"Missing {expected_dir}"
        for deleted_dir in dirs[2:]:
            assert deleted_dir.exists() is False, f"Unexpected {deleted_dir}"

    def test_remove_artifacts_removes_artifacts_removes_seed(
        self, clean_paths, init_class
    ):
        """remove_artifacts removes seed dir when remove_seed is True."""
        dirs = [
            clean_paths.cloud_dir,
            clean_paths.cloud_dir.join("seed"),
            clean_paths.cloud_dir.join("dir1"),
            clean_paths.cloud_dir.join("dir2"),
        ]
        for _dir in dirs:
            ensure_dir(_dir)

        with mock.patch.object(
            cloudinit.settings, "CLEAN_RUNPARTS_DIR", clean_paths.clean_dir
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
                remove_seed=True,
            )
        assert 0 == retcode
        assert (
            clean_paths.cloud_dir.exists() is True
        ), f"Missing dir {clean_paths.cloud_dir}"
        for deleted_dir in dirs[1:]:
            assert (
                deleted_dir.exists() is False
            ), f"Unexpected {deleted_dir} dir"

    def test_remove_artifacts_returns_one_on_errors(
        self, clean_paths, init_class, capsys
    ):
        """remove_artifacts returns non-zero on failure and prints an error."""
        ensure_dir(clean_paths.cloud_dir)
        ensure_dir(clean_paths.cloud_dir.join("dir1"))

        with mock.patch(
            "cloudinit.cmd.clean.del_dir", side_effect=OSError("oops")
        ):
            retcode = clean.remove_artifacts(
                init_class,
                remove_logs=False,
            )
        assert 1 == retcode
        _out, err = capsys.readouterr()
        assert (
            f"Error:\nCould not remove {clean_paths.cloud_dir}/dir1: oops\n"
            == err
        )

    def test_handle_clean_args_reboots(self, init_class):
        """handle_clean_args_reboots when reboot arg is provided."""

        called_cmds = []

        def fake_subp(cmd, capture):
            called_cmds.append(cmd)
            return "", ""

        myargs = namedtuple(
            "myargs", "remove_logs remove_seed remove_config reboot machine_id"
        )
        cmdargs = myargs(
            remove_logs=False,
            remove_seed=False,
            remove_config=[],
            reboot=True,
            machine_id=False,
        )
        retcode = wrap_and_call(
            "cloudinit.cmd.clean",
            {
                "subp": {"side_effect": fake_subp},
                "Init": {"return_value": init_class},
            },
            clean.handle_clean_args,
            name="does not matter",
            args=cmdargs,
        )
        assert 0 == retcode
        assert [["shutdown", "-r", "now"]] == called_cmds

    @pytest.mark.parametrize(
        "machine_id,systemd_val",
        (
            pytest.param(True, True, id="machine_id_on_systemd_uninitialized"),
            pytest.param(
                True, False, id="machine_id_non_systemd_removes_file"
            ),
            pytest.param(False, False, id="no_machine_id_param_file_remains"),
        ),
    )
    @mock.patch("cloudinit.cmd.clean.uses_systemd")
    def test_handle_clean_args_removed_machine_id(
        self, uses_systemd, machine_id, systemd_val, clean_paths, init_class
    ):
        """handle_clean_args removes /etc/machine-id when arg is True."""
        uses_systemd.return_value = systemd_val
        myargs = namedtuple(
            "myargs", "remove_logs remove_seed remove_config reboot machine_id"
        )
        cmdargs = myargs(
            remove_logs=False,
            remove_seed=False,
            remove_config=[],
            reboot=False,
            machine_id=machine_id,
        )
        machine_id_path = clean_paths.tmpdir.join("machine-id")
        machine_id_path.write("SOME-AMAZN-MACHINE-ID")
        with mock.patch.object(
            cloudinit.settings, "CLEAN_RUNPARTS_DIR", clean_paths.clean_dir
        ):
            with mock.patch.object(
                cloudinit.cmd.clean, "ETC_MACHINE_ID", machine_id_path.strpath
            ):
                with mock.patch(
                    "cloudinit.cmd.clean.Init", return_value=init_class
                ):
                    assert 0 == clean.handle_clean_args(
                        name="does not matter",
                        args=cmdargs,
                    )
        if systemd_val:
            if machine_id:
                assert "uninitialized\n" == machine_id_path.read()
            else:
                assert "SOME-AMAZN-MACHINE-ID" == machine_id_path.read()
        else:
            assert machine_id_path.exists() is bool(not machine_id)

    def test_status_main(self, clean_paths, init_class):
        """clean.main can be run as a standalone script."""
        clean_paths.log.write("cloud-init-log")
        with pytest.raises(SystemExit) as context_manager:
            wrap_and_call(
                "cloudinit.cmd.clean",
                {
                    "Init": {"return_value": init_class},
                    "sys.argv": {"new": ["clean", "--logs"]},
                },
                clean.main,
            )
        assert 0 == context_manager.value.code
        assert (
            clean_paths.log.exists() is False
        ), f"Unexpected log {clean_paths.log}"
